{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create entry points to spark\n",
    "try:\n",
    "    sc.stop()\n",
    "except:\n",
    "    pass\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "sc=SparkContext()\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "mtcars = spark.read.csv(path='../../data/mtcars.csv',\n",
    "                        sep=',',\n",
    "                        encoding='UTF-8',\n",
    "                        comment=None,\n",
    "                        header=True, \n",
    "                        inferSchema=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## DataFrame to RDD\n",
    "A **DataFrame** can be easily converted to an **RDD** by calling the `pyspark.sql.DataFrame.rdd()` function. Each element in the returned RDD is an **pyspark.sql.Row** object. An Row is a list of key-value pairs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(_c0='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),\n",
       " Row(_c0='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4)]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mtcars.rdd.take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With an RDD object, we can apply a set of mapping functions, such as **map**, **mapValues**, **flatMap**, **flatMapValues** and a lot of other methods that come from RDD."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Mazda RX4', 21.0),\n",
       " ('Mazda RX4 Wag', 21.0),\n",
       " ('Datsun 710', 22.8),\n",
       " ('Hornet 4 Drive', 21.4),\n",
       " ('Hornet Sportabout', 18.7)]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mtcars_map = mtcars.rdd.map(lambda x: (x['_c0'], x['mpg']))\n",
    "mtcars_map.take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Mazda RX4', [21.0, 210.0]),\n",
       " ('Mazda RX4 Wag', [21.0, 210.0]),\n",
       " ('Datsun 710', [22.8, 228.0]),\n",
       " ('Hornet 4 Drive', [21.4, 214.0]),\n",
       " ('Hornet Sportabout', [18.7, 187.0])]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mtcars_mapvalues = mtcars_map.mapValues(lambda x: [x, x * 10])\n",
    "mtcars_mapvalues.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## RDD to DataFrame\n",
    "\n",
    "To convert an RDD to a DataFrame, we can use the `SparkSession.createDataFrame()` function. Every element in the RDD **has be to an Row object**.\n",
    "\n",
    "Create an RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',\n",
       " 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',\n",
       " 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',\n",
       " 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',\n",
       " 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd_raw = sc.textFile('../../data/mtcars.csv')\n",
    "rdd_raw.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save the first row to a variable"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['model',\n",
       " 'mpg',\n",
       " 'cyl',\n",
       " 'disp',\n",
       " 'hp',\n",
       " 'drat',\n",
       " 'wt',\n",
       " 'qsec',\n",
       " 'vs',\n",
       " 'am',\n",
       " 'gear',\n",
       " 'carb']"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'mpg').collect()[0]\n",
    "header[0] = 'model'\n",
    "header"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save the rest to a new RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[['Mazda RX4',\n",
       "  '21',\n",
       "  '6',\n",
       "  '160',\n",
       "  '110',\n",
       "  '3.9',\n",
       "  '2.62',\n",
       "  '16.46',\n",
       "  '0',\n",
       "  '1',\n",
       "  '4',\n",
       "  '4'],\n",
       " ['Mazda RX4 Wag',\n",
       "  '21',\n",
       "  '6',\n",
       "  '160',\n",
       "  '110',\n",
       "  '3.9',\n",
       "  '2.875',\n",
       "  '17.02',\n",
       "  '0',\n",
       "  '1',\n",
       "  '4',\n",
       "  '4']]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'mpg')\n",
    "rdd.take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Convert RDD elements to RDD Row objects"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First we define a function which takes a list of column names and a list of values and create a Row of key-value pairs. **Since keys in an Row object are variable names, we can’t simply pass a dictionary to the Row() function**. We can think of a dictionary as an argument list and use the `**` to unpack the argument list.\n",
    "\n",
    "See an example."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Row(a=1, b=2, c=3)"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "my_dict = dict(zip(['a', 'b', 'c'], range(1, 4)))\n",
    "Row(**my_dict)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Let’s define the function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "def list_to_row(keys, values):\n",
    "    row_dict = dict(zip(keys, values))\n",
    "    return Row(**row_dict)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Convert elements to RDD objects"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(am='1', carb='4', cyl='6', disp='160', drat='3.9', gear='4', hp='110', model='Mazda RX4', mpg='21', qsec='16.46', vs='0', wt='2.62'),\n",
       " Row(am='1', carb='4', cyl='6', disp='160', drat='3.9', gear='4', hp='110', model='Mazda RX4 Wag', mpg='21', qsec='17.02', vs='0', wt='2.875'),\n",
       " Row(am='1', carb='1', cyl='4', disp='108', drat='3.85', gear='4', hp='93', model='Datsun 710', mpg='22.8', qsec='18.61', vs='1', wt='2.32')]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd_rows = rdd.map(lambda x: list_to_row(header, x))\n",
    "rdd_rows.take(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can convert the RDD to a DataFrame."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----+---+----+----+----+---+-----------------+----+-----+---+-----+\n",
      "| am|carb|cyl|disp|drat|gear| hp|            model| mpg| qsec| vs|   wt|\n",
      "+---+----+---+----+----+----+---+-----------------+----+-----+---+-----+\n",
      "|  1|   4|  6| 160| 3.9|   4|110|        Mazda RX4|  21|16.46|  0| 2.62|\n",
      "|  1|   4|  6| 160| 3.9|   4|110|    Mazda RX4 Wag|  21|17.02|  0|2.875|\n",
      "|  1|   1|  4| 108|3.85|   4| 93|       Datsun 710|22.8|18.61|  1| 2.32|\n",
      "|  0|   1|  6| 258|3.08|   3|110|   Hornet 4 Drive|21.4|19.44|  1|3.215|\n",
      "|  0|   2|  8| 360|3.15|   3|175|Hornet Sportabout|18.7|17.02|  0| 3.44|\n",
      "+---+----+---+----+----+----+---+-----------------+----+-----+---+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = spark.createDataFrame(rdd_rows)\n",
    "df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
